創建完 ETL 的 Spark Job 後我們要加入資料處理的內容,在預設的程式碼中只能做到資料搬遷,那這次我們的目標是要找出每個 user 最常購買的前五名商品,這部分會使用 PySpark 的進行
接下來會以修改後的程式進行說明,以下是每個 user 購買數量前五名商品的 PySpark 程式碼
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
import pyspark.sql.functions as F
from pyspark.sql.window import Window
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "it_db", table_name = "order_products_prior", transformation_ctx = "datasource0"]
## @return: order_products_prior
## @inputs: []
order_products_prior = glueContext.create_dynamic_frame.from_catalog(database = "it_db", table_name = "order_products_prior", transformation_ctx = "order_products_prior")
## @type: DataSource
## @args: [database = "sampledata", table_name = "order", transformation_ctx = "order"]
## @return: order
## @inputs: []
order = glueContext.create_dynamic_frame.from_catalog(database = "it_db", table_name = "order", transformation_ctx = "order")
## @type: DataSource
## @args: [database = "sampledata", table_name = "products", transformation_ctx = "products"]
## @return: products
## @inputs: []
products = glueContext.create_dynamic_frame.from_catalog(database = "it_db", table_name = "products", transformation_ctx = "products")
## @type: Join
## @args: [keys1 = ["order_id"], keys2 = ["order_id"]]
## @return: join_order
## @inputs: [frame1 = order_products_prior, frame2 = order]
join_order = Join.apply(frame1 = order_products_prior, frame2 = order, keys1 = ["order_id"], keys2 = ["order_id"], transformation_ctx = "joindata")
## @type: Join
## @args: [keys1 = ["product_id"], keys2 = ["product_id"]]
## @return: join_products
## @inputs: [frame1 = join_order, frame2 = products]
join_products = Join.apply(frame1 = join_order, frame2 = products, keys1 = ["product_id"], keys2 = ["product_id"], transformation_ctx = "joindata_products").toDF()